Skip to content

Conversation

Copy link
Contributor

Copilot AI commented Oct 4, 2025

Overview

This PR implements automatic flow/subplan identification in the physical planner to enable more efficient parallel execution. The planner now identifies chains of operations that can be executed together on a worker without needing to report back interim snapshots.

Problem

The parallel execution engine needs to understand which operations can be grouped together and executed as a unit. Previously, the physical plan was just a DAG of operators without any grouping information, making it difficult to optimize parallel execution and minimize communication overhead between workers.

Solution

Added identify_flows() method to PhysicalPlan that automatically annotates each operator node with a flow_id indicating which flow it belongs to. Flows are linear sequences of stateless operators that can execute together. The algorithm breaks flows at natural boundaries:

  • Stateful nodes (aggregates, sorts, distinct) - require accumulation across data batches
  • Join nodes - require coordination between multiple data sources
  • Branch points - nodes with multiple children (data splits)
  • Merge points - nodes with multiple parents (data merges)

Boundary nodes are marked with flow_id=None to indicate they require special handling.

Example

For a query with joins and aggregations:

SELECT category, COUNT(*) 
FROM table1 t1 
JOIN table2 t2 ON t1.id = t2.id 
WHERE t1.value > 10 
GROUP BY category 
LIMIT 100

The physical plan creates the following flows:

Scan(t1) -> Filter(t1) -> [Flow 0]
Scan(t2) -> Filter(t2) -> [Flow 1]
Join                   -> [boundary: flow_id=None]
Project                -> [Flow 2]
AggregateAndGroup      -> [boundary: flow_id=None]
Limit                  -> [Flow 3]

This allows the parallel engine to:

  • Execute flows 0 and 1 in parallel on separate workers
  • Coordinate at the join boundary
  • Execute flow 2 independently after the join
  • Coordinate at the aggregation boundary
  • Execute flow 3 for the final result

Implementation Details

Algorithm:

  • Uses breadth-first search (BFS) from entry points to ensure parents are processed before children
  • O(N) complexity where N is the number of nodes
  • Processes all nodes exactly once
  • Correctly handles complex plans with multiple branches and joins

Integration:

  • Automatically called in create_physical_plan() after the plan is constructed
  • No breaking changes to existing code
  • Nodes gain a new flow_id attribute that existing code can safely ignore

Benefits

  1. Reduced Communication Overhead - Workers can execute entire flows without reporting interim results
  2. Better Data Locality - Related operations stay together on the same worker
  3. Simplified Coordination - Only boundary nodes require synchronization between workers
  4. Clear Parallelization Strategy - Explicit identification of parallelizable work units
  5. Performance Improvements - Less inter-worker communication and scheduling overhead

Testing

Added comprehensive test suite covering:

  • Linear chains of stateless operators forming single flows
  • Stateful nodes acting as flow boundaries
  • Join scenarios with separate flows for each branch
  • Branch points creating new flows
  • Merge points creating new flows
  • Complex queries with multiple joins and aggregations

All tests verify correct flow identification and proper handling of boundary nodes.

Files Changed

  • opteryx/models/physical_plan.py - Added identify_flows() method with comprehensive documentation
  • opteryx/planner/physical_planner.py - Integrated flow identification into plan creation
  • tests/query_execution/test_flow_identification.py - Full test coverage for all scenarios

Next Steps

The parallel execution engine can now leverage flow information to:

  • Send entire flows to workers as single units of work
  • Schedule independent flows to run in parallel
  • Minimize data transfer between workers by executing flows locally
  • Optimize resource allocation based on flow complexity and dependencies
Original prompt

can you update the physical planner to plan so that chains of the pipeline that are able to be run in parallel are created into "flows" or "subplans", the intention being that we we implement the parallel engine, these chains are sent to a worker to execute together without needing to report back interim snapshots if it doesn't need to.


✨ Let Copilot coding agent set things up for you — coding agent works faster and does higher quality work when set up for your repo.

@sonarqubecloud
Copy link

sonarqubecloud bot commented Oct 4, 2025

@sonarqubecloud
Copy link

sonarqubecloud bot commented Oct 4, 2025

Copilot AI changed the title can you update the physical planner to plan so that chains of the pipeline that are able to be run in parallel are created into "flows" or "subplans", the intention being that we we implement the parallel engine, these chains are sent to a worker to ex... Add flow/subplan identification to physical planner for parallel execution Oct 4, 2025
Copilot AI requested a review from joocer October 4, 2025 19:15
Copilot finished work on behalf of joocer October 4, 2025 19:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants